package com.zhang.hadoop.flink.test8;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author: zhang yufei
 * @createTime:2022/9/4 11:30
 * @description:
 */
public class BehaviorPatternDetectExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //用户的行为数据流
        DataStreamSource<Action> actionStream = env.fromElements(
                new Action("ouyanghui", "yindao"),
                new Action("chenyupin", "zujiao"),
                new Action("wangjingru", "gangmen"),
                new Action("yangdan", "kaoujiao")
        );

        //行为模式流，基于它构建广播流
        DataStreamSource<Pattern> patternStream = env.fromElements(
                new Pattern("yindao", "cha"),
                new Pattern("yindao", "ba")
        );

        //定义广播状态描述器
        MapStateDescriptor<Void, Pattern> descriptor = new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class));
        BroadcastStream<Pattern> broadcastStream = patternStream.broadcast(descriptor);

        //连接两条流进行处理
        SingleOutputStreamOperator<Tuple2<String, Pattern>> matches = actionStream.keyBy(data -> data.userId)
                .connect(broadcastStream)
                .process(new PatternDetector());

        matches.print();

        env.execute();
    }

    //实现自定的KeyedBroadcastProcessFunction
    public static class PatternDetector extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>> {

        //定义一个keyedState，保存上次用户的行为
        ValueState<String> prevActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-action", String.class));
        }

        @Override
        public void processElement(Action action, ReadOnlyContext readOnlyContext, Collector<Tuple2<String, Pattern>> collector) throws Exception {
            ReadOnlyBroadcastState<Void, Pattern> patternState = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
            Pattern pattern = patternState.get(null);
            //获取用户上一次的行为
            String prevAction = prevActionState.value();
            //判断是否匹配
            if (patternState != null && prevAction != null) {
                if (pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {
                    collector.collect(new Tuple2<>(readOnlyContext.getCurrentKey(), pattern));
                }
            }
            //更新状态
            prevActionState.update(action.action);
        }

        @Override
        public void processBroadcastElement(Pattern pattern, Context context, Collector<Tuple2<String, Pattern>> collector) throws Exception {
            //从上下文中获取广播状态，并用当前数据状态更新
            BroadcastState<Void, Pattern> patternState = context.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
            patternState.put(null, pattern);
        }
    }

    //定义用户行为时间和模式的POJO类
    public static class Action {

        public String userId;

        public String action;

        public Action() {
        }

        public Action(String userId, String action) {
            this.userId = userId;
            this.action = action;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId='" + userId + '\'' +
                    ", action='" + action + '\'' +
                    '}';
        }
    }

    public static class Pattern {

        public String action1;

        public String action2;

        public Pattern() {
        }

        public Pattern(String action1, String action2) {
            this.action1 = action1;
            this.action2 = action2;
        }
    }
}
